// Copyright 2024 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package botapi

import (
	"context"
	"fmt"
	"slices"
	"strings"
	"time"

	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"go.chromium.org/luci/common/clock"
	"go.chromium.org/luci/common/data/rand/mathrand"
	"go.chromium.org/luci/common/errors"
	"go.chromium.org/luci/common/logging"
	"go.chromium.org/luci/common/retry/transient"

	configpb "go.chromium.org/luci/swarming/proto/config"
	internalspb "go.chromium.org/luci/swarming/proto/internals"
	"go.chromium.org/luci/swarming/server/botinfo"
	"go.chromium.org/luci/swarming/server/botsession"
	"go.chromium.org/luci/swarming/server/botsrv"
	"go.chromium.org/luci/swarming/server/botstate"
	"go.chromium.org/luci/swarming/server/cfg"
	"go.chromium.org/luci/swarming/server/model"
	"go.chromium.org/luci/swarming/server/validate"
)

// PollCommand instructs the bot what to do after it calls /bot/poll.
type PollCommand string

const (
	// PollRBE tells the bot it should open and run an RBE session.
	PollRBE PollCommand = "rbe"
	// PollSleep tells the bot it should shutdown the RBE session and sleep.
	PollSleep PollCommand = "sleep"
	// PollUpdate tells the bot it should self-update.
	PollUpdate PollCommand = "update"
	// PollBotRestart tells the bot process to restart.
	PollBotRestart PollCommand = "bot_restart"
)

// PollRequest is sent by the bot.
type PollRequest struct {
	// Session is a serialized Swarming Bot Session proto.
	Session []byte `json:"session"`

	// Dimensions are the current dimensions collected by the bot.
	//
	// At least `id` dimension must be set. It is the bot ID.
	Dimensions map[string][]string `json:"dimensions"`

	// State is (mostly) arbitrary JSON dict with various properties of the bot.
	//
	// Values here are not indexed and they do not affect how tasks are scheduled
	// on the bot. The server is still aware of some keys and checks them to
	// decide how to handle the bot calls.
	State botstate.Dict `json:"state"`

	// Version is the bot's own version.
	//
	// It is a digest of the running bot archive. Used to detect if the bot should
	// self-update.
	Version string `json:"version"`

	// RequestUUID is used to skip reporting duplicate events on retries.
	//
	// Generated by the client (usually an UUID4 string). Optional.
	RequestUUID string `json:"request_uuid,omitempty"`
}

func (r *PollRequest) ExtractSession() []byte { return r.Session }
func (r *PollRequest) ExtractDebugRequest() any {
	return &PollRequest{
		Session:     nil,
		Dimensions:  r.Dimensions,
		State:       r.State,
		Version:     r.Version,
		RequestUUID: r.RequestUUID,
	}
}

// PollResponse is returned by the server.
type PollResponse struct {
	// Cmd instructs the bot what to do next.
	Cmd PollCommand `json:"cmd"`

	// Session is an updated serialized Swarming Bot Session proto.
	Session []byte `json:"session,omitempty"`

	// Duration is how long to sleep (in seconds) for PollSleep command.
	Duration float64 `json:"duration,omitempty"`

	// Version is the bot version to update to for PollUpdate command.
	Version string `json:"version,omitempty"`

	// Message is why the bot is restarting for PollBotRestart command.
	Message string `json:"message,omitempty"`

	// RBE contains parameters of the RBE session for PollRBE command.
	RBE *BotRBEParams `json:"rbe,omitempty"`
}

// Poll implements the handler that collects information about the bot.
//
// In the past it also used to tell the bot what to execute (the bot was
// "polling for work") but this is now done through RBE session endpoints. Now
// this handler is more of a "heartbeat" handler, called occasionally by the bot
// when it is in-between tasks.
//
// This handler does following:
//   - Registers heartbeats from the bot (updating BotInfo.LastSeenTs).
//   - Updates the bot state in the datastore (dimensions, state, health).
//   - Refreshes the bot's session token.
//   - Issues self-update and restart commands.
//   - Tells the bot what RBE instance to use.
func (srv *BotAPIServer) Poll(ctx context.Context, body *PollRequest, _ *botsrv.Request) (botsrv.Response, error) {
	// Parse and validate the request.
	pr, err := srv.processPoll(ctx, body)
	if err != nil {
		return nil, err
	}

	// Ask outdated bots to self-update if necessary. Note we do that even for
	// "broken" requests and requests without a session token. Presumably updating
	// the bot will fix it.
	if pr.version != "" {
		channel, botArchive := pr.conf.BotChannel(pr.botID)
		if pr.version != botArchive.Digest {
			logging.Infof(ctx, "Using release channel %q", channel)
			return srv.pollResponse(ctx, pr, &PollResponse{
				Cmd:     PollUpdate,
				Version: botArchive.Digest,
			})
		}
	}

	// Need a valid session to proceed.
	if pr.session == nil {
		if pr.sessionBroken {
			// The bot sent some session token, but it was invalid (likely expired).
			// Ask the bot to restart to get a new session.
			return srv.pollResponse(ctx, pr, &PollResponse{
				Cmd:     PollBotRestart,
				Message: "Bot session expired",
			})
		}
		// The bot didn't send a session token at all, even though it runs the
		// most recent version. Something is badly broken. Quarantine it and ask to
		// sleep for a relatively long time (since it is unlikely the problem will
		// fix itself quickly).
		pr.validationErr(ctx, errors.New("no session token"))
		return srv.pollResponse(ctx, pr, &PollResponse{
			Cmd:      PollSleep,
			Duration: randomDuration(ctx, 5*time.Minute, 10*time.Minute),
		})
	}

	// Update the session, in particular BotConfig embedded there.
	sessionToken, err := botsession.Marshal(botsession.Update(pr.session, botsession.SessionParameters{
		BotGroup:          pr.group,
		RBEConfig:         pr.rbeConf,
		RBEEffectiveBotID: pr.rbeEffectiveBotID(),
		ServerConfig:      pr.conf,
		DebugInfo:         botsession.DebugInfo(ctx, srv.version),
		Now:               clock.Now(ctx),
	}), srv.hmacSecret)
	if err != nil {
		return nil, status.Errorf(codes.Internal, "fail to marshal session proto: %s", err)
	}

	// Ask the bot to restart to pick up new configs consumed only during the
	// handshake (like the bot hooks or custom server-assigned dimensions), if
	// they have changed since the bot created the session.
	if botsession.IsHandshakeConfigStale(pr.session, pr.group) {
		return srv.pollResponse(ctx, pr, &PollResponse{
			Cmd:     PollBotRestart,
			Session: sessionToken,
			Message: "Restarting to pick up new bots.cfg config",
		})
	}

	// If the bot is quarantined or in maintenance, ask it to sleep.
	switch healthInfo, err := pr.calculateHealthInfo(); {
	case err != nil:
		return nil, err
	case healthInfo.Quarantined != "":
		// Sleep longer, quarantined bots are usually broken for a long time and
		// there's no need to hammer the server.
		return srv.pollResponse(ctx, pr, &PollResponse{
			Cmd:      PollSleep,
			Session:  sessionToken,
			Duration: randomDuration(ctx, time.Minute, 4*time.Minute),
		})
	case pr.healthInfo.Maintenance != "":
		// Sleep less, maintenance is expected and should fix itself quickly.
		return srv.pollResponse(ctx, pr, &PollResponse{
			Cmd:      PollSleep,
			Session:  sessionToken,
			Duration: randomDuration(ctx, 10*time.Second, time.Minute),
		})
	}

	// The bot is totally healthy. Tell it to open or maintain the RBE session.
	return srv.pollResponse(ctx, pr, &PollResponse{
		Cmd:     PollRBE,
		Session: sessionToken,
		RBE: &BotRBEParams{
			Instance: pr.rbeConf.Instance,
		},
	})
}

// pollRequest contains information extracted from the poll request.
//
// Note the request may still be pretty broken. It will still be accepted, since
// we need to process it to record the bot as quarantined (for sending broken
// requests). Otherwise such broken requests are hard to surface.
type pollRequest struct {
	botID string // ID of the bot extracted from the session or dimensions

	errs errors.MultiError // all request validation errors, if any

	session       *internalspb.Session // validated session or nil if missing or broken
	sessionBroken bool                 // true if there is a session token, but it is broken

	version     string        // the bot version from the request, if known
	requestUUID string        // the request UUID for idempotency, if known
	state       botstate.Dict // validated state dict or an empty dict if broken
	dims        []string      // bot's "k:v" dimensions or nil if they are invalid

	conf  *cfg.Config   // the config snapshot used to authorize the bot
	group *cfg.BotGroup // the group the bot belongs to

	rbeConf        cfg.RBEConfig                  // the matching RBEConfig
	effectiveBotID *botinfo.RBEEffectiveBotIDInfo // derived from dimensions and rbeConf

	quarantined []string            // values of "quarantined" dimension (set even if `dims` are nil)
	healthInfo  *botinfo.HealthInfo // populated lazily in calculateHealthInfo()
}

// validationErr logs the error and appends it to the list of validation errors.
//
// Validation errors will result in the bot being quarantined. Must be called
// before calculateHealthInfo(), panics otherwise.
func (pr *pollRequest) validationErr(ctx context.Context, err error) {
	if pr.healthInfo != nil {
		panic("validationErr called after calculateHealthInfo")
	}
	logging.Errorf(ctx, "Quarantining %s: %s", pr.botID, err)
	pr.errs = append(pr.errs, err)
}

// calculateHealthInfo calculates HealthInfo if it wasn't calculated yet.
//
// This also may update `pr.state` if the bot ends up quarantined due to
// validation errors.
//
// Returns an internal gRPC error if the state dict can't be updated.
func (pr *pollRequest) calculateHealthInfo() (*botinfo.HealthInfo, error) {
	if pr.healthInfo == nil {
		healthInfo, state, err := updateBotHealthInfo(pr.state, pr.quarantined, pr.errs)
		if err != nil {
			return nil, status.Errorf(codes.Internal, "failed to update the bot state dict: %s", err)
		}
		pr.healthInfo = &healthInfo
		pr.state = state
	}
	return pr.healthInfo, nil
}

// rbeEffectiveBotID returns the RBE effective bot ID to put into the session.
func (pr *pollRequest) rbeEffectiveBotID() string {
	if pr.effectiveBotID == nil {
		// Don't change it if we failed to calculate the new value.
		return pr.session.BotConfig.RbeEffectiveBotId
	}
	return pr.effectiveBotID.RBEEffectiveBotID
}

// processPoll parses poll arguments, does validation and authorization.
//
// If the request is authorized, but has invalid dimensions or state, returns
// *poolRequest with `errs` inside. The bot will be put into quarantine in that
// case.
func (srv *BotAPIServer) processPoll(ctx context.Context, body *PollRequest) (*pollRequest, error) {
	// Bots are expected to send session tokens. But very old bots don't. We
	// should tolerate missing token to let old bots to self-update. The same for
	// expired or broken session tokens (except we'll ask the bot to restart to
	// get a new token).
	var session *internalspb.Session
	var sessionBroken bool
	if len(body.Session) != 0 {
		var err error
		session, err = botsession.CheckSessionToken(body.Session, srv.hmacSecret, clock.Now(ctx))
		if err != nil {
			logging.Warningf(ctx, "Bad session token: %s", status.Convert(err).Message())
			if session != nil {
				botsession.LogSession(ctx, session)
			}
			sessionBroken = true
			session = nil
		}
	}

	// There must be "id" dimension. We'll validate the rest of dimensions later.
	botIDFromDims, err := peekBotID(body.Dimensions)
	if err != nil {
		return nil, err
	}

	// If have a valid session, use bot ID from it. Otherwise fallback to
	// dimensions. Both should be equal in correctly functioning bots. We'll check
	// that later.
	var botID string
	if session != nil {
		botID = session.BotId
	} else {
		botID = botIDFromDims
	}

	// If have a valid session, use its LastSeenConfig to force fetch the correct
	// version of the server config. Otherwise the bot might observe config going
	// back in time if it hits a backend with an older cached config.
	var conf *cfg.Config
	if session != nil {
		var err error
		conf, err = srv.cfg.FreshEnough(ctx, session.LastSeenConfig.AsTime())
		if err != nil {
			return nil, status.Errorf(codes.Internal, "failed to fetch service config")
		}
	} else {
		conf = srv.cfg.Cached(ctx)
	}

	// Authorize the bot. Note that unlike most other handlers, we use the current
	// server config here (not the snapshot stored in the session), because Poll
	// handler is where this snapshot is actually refreshed.
	group := conf.BotGroup(botID)
	if err := srv.authorizeBot(ctx, botID, group.Auth); err != nil {
		if transient.Tag.In(err) {
			return nil, status.Errorf(codes.Internal, "transient error checking bot credentials: %s", err)
		}
		return nil, status.Errorf(codes.Unauthenticated, "the bot is not in bots.cfg or wrong credentials passed: %s", err)
	}

	// Validate dimensions. We'll quarantine the bot if they are malformed.
	var errs errors.MultiError
	for _, err := range validate.BotDimensions(body.Dimensions) {
		errs = append(errs, errors.WrapIf(err, "bad dimensions"))
	}
	if session != nil && session.BotId != botIDFromDims {
		errs = append(errs, errors.Fmt(`"id" dimension %q doesn't match bot ID in the session %q`, botIDFromDims, session.BotId))
	}

	// If dimensions are valid, apply server-enforced dimensions on top. This is
	// a security-critical feature, since "pool" is one of such server-enforced
	// dimensions.
	//
	// Note `dims` will remain nil if dimensions are broken. It is an indication
	// that they should not be stored in BotInfo.
	var dims []string
	if len(errs) == 0 {
		for key, vals := range body.Dimensions {
			if _, set := group.Dimensions[key]; !set {
				for _, val := range vals {
					dims = append(dims, fmt.Sprintf("%s:%s", key, val))
				}
			}
		}
		for key, vals := range group.Dimensions {
			for _, val := range vals {
				dims = append(dims, fmt.Sprintf("%s:%s", key, val))
			}
		}
		slices.Sort(dims)
	}

	// Validate the state is a correct JSON. We'll quarantine the bot if it isn't.
	state := body.State
	if err := state.Unseal(); err != nil {
		errs = append(errs, errors.Fmt("bad state dict: %w", err))
		state = botstate.Dict{}
	}

	// A version string is required.
	if body.Version == "" {
		errs = append(errs, errors.New("no `version` in the request"))
	}

	// The bot must be in the RBE mode with the RBE instance configured.
	rbeConf, err := conf.RBEConfig(botID)
	switch {
	case err != nil:
		errs = append(errs, errors.Fmt("conflicting RBE config: %w", err))
	case rbeConf.Instance == "":
		errs = append(errs, errors.New("RBE is not configured"))
	case rbeConf.Mode != configpb.Pool_RBEMigration_BotModeAllocation_RBE:
		errs = append(errs, errors.Fmt("unsupported RBE mode %s", rbeConf.Mode))
	}

	// Derive the effective bot ID to use in RBE sessions. This fails if the
	// effective bot ID dimension has more than one value. Note need to use `dims`
	// here (not body.Dimensions) to make sure we pick up server-enforced
	// dimensions applied to it.
	//
	// Do not change the current effective bot ID if dimensions are broken or
	// a new one can't be derived.
	var effectiveBotID *botinfo.RBEEffectiveBotIDInfo
	if len(dims) != 0 {
		if rbeConf.EffectiveBotIDDimension == "" {
			effectiveBotID = &botinfo.RBEEffectiveBotIDInfo{} // use default bot ID
		} else {
			rbeEffectiveBotID, err := deriveRBEEffectiveBotID(dims, rbeConf.EffectiveBotIDDimension)
			if err != nil {
				errs = append(errs, errors.Fmt("RBE effective bot ID: %w", err))
			} else {
				effectiveBotID = &botinfo.RBEEffectiveBotIDInfo{
					RBEEffectiveBotID: rbeEffectiveBotID,
				}
			}
		}
	}

	// Log all errors for easier debugging.
	for _, err := range errs {
		logging.Errorf(ctx, "Validation error: %s", err)
	}

	return &pollRequest{
		botID:          botID,
		errs:           errs,
		session:        session,
		sessionBroken:  sessionBroken,
		version:        body.Version,
		requestUUID:    body.RequestUUID,
		state:          state,
		dims:           dims,
		conf:           conf,
		rbeConf:        rbeConf,
		effectiveBotID: effectiveBotID,
		group:          group,
		quarantined:    body.Dimensions[botstate.QuarantinedKey],
	}, nil
}

// pollResponse completes the request by recording the BotInfo event matching
// the command in the PollResponse.
//
// On success returns `resp` itself.
func (srv *BotAPIServer) pollResponse(ctx context.Context, pr *pollRequest, resp *PollResponse) (botsrv.Response, error) {
	var ev model.BotEventType
	switch resp.Cmd {
	case PollRBE:
		if effectiveID := pr.rbeEffectiveBotID(); effectiveID != "" {
			logging.Infof(ctx, "Asking %q to run RBE session at %q with bot ID %q", pr.botID, resp.RBE.Instance, effectiveID)
		} else {
			logging.Infof(ctx, "Asking %q to run RBE session at %q", pr.botID, resp.RBE.Instance)
		}
		if pr.state.MustReadBool("rbe_idle") {
			ev = model.BotEventIdle
		} else {
			ev = model.BotEventPolling
		}
	case PollSleep:
		logging.Infof(ctx, "Asking %q to sleep for %.1f sec", pr.botID, resp.Duration)
		ev = model.BotEventSleep
	case PollUpdate:
		logging.Infof(ctx, "Asking %q to update: %s => %s", pr.botID, pr.version, resp.Version)
		ev = model.BotEventUpdate
	case PollBotRestart:
		logging.Infof(ctx, "Asking %q to restart: %s", pr.botID, resp.Message)
		ev = model.BotEventRestart
	default:
		panic(fmt.Sprintf("unknown resp.Cmd %s", resp.Cmd))
	}

	// Note: must do it before checking pr.state, since calculateHealthInfo may
	// update pr.state as a side effect.
	healthInfo, err := pr.calculateHealthInfo()
	if err != nil {
		return nil, err
	}
	switch {
	case healthInfo.Quarantined != "":
		logging.Warningf(ctx, "The bot %q is quarantined: %s", pr.botID, healthInfo.Quarantined)
	case healthInfo.Maintenance != "":
		logging.Infof(ctx, "The bot %q is in maintenance: %s", pr.botID, healthInfo.Maintenance)
	}

	var botState *botstate.Dict
	if !pr.state.IsEmpty() {
		botState = &pr.state
	}

	update := &botinfo.Update{
		BotID:         pr.botID,
		EventType:     ev,
		EventDedupKey: pr.requestUUID,
		EventMessage:  resp.Message,
		TasksManager:  srv.tasksManager,
		Dimensions:    pr.dims, // will be nil if the request is broken, meaning do not update dims in BotInfo
		State:         botState,
		CallInfo: botCallInfo(ctx, &botinfo.CallInfo{
			SessionID: pr.session.GetSessionId(),
			Version:   pr.version,
		}),
		BotGroupInfo: &botinfo.BotGroupInfo{
			Dimensions: pr.group.Dimensions,
			Owners:     pr.group.Owners,
		},
		HealthInfo:         healthInfo,
		EffectiveBotIDInfo: pr.effectiveBotID,
	}
	if err := srv.submitUpdate(ctx, update); err != nil {
		return nil, status.Errorf(codes.Internal, "failed to update bot info: %s", err)
	}
	return resp, nil
}

// deriveRBEEffectiveBotID derives the effective bot ID to use in RBE sessions.
//
// May return ("", nil) if it is fine to keep using the real bot ID.
func deriveRBEEffectiveBotID(dims []string, effectiveDimKey string) (string, error) {
	var pool, effectiveDim string
	for _, dim := range dims {
		switch k, v, _ := strings.Cut(dim, ":"); {
		case k == "pool":
			if pool != "" {
				return "", errors.New("bots using effective Bot ID feature cannot belong to multiple pools")
			}
			pool = v
		case k == effectiveDimKey:
			if effectiveDim != "" {
				return "", errors.Fmt("effective bot ID dimension %q must have only one value", effectiveDimKey)
			}
			effectiveDim = v
		}
	}
	if pool == "" {
		panic("impossible bot without a pool")
	}
	if effectiveDim == "" {
		return "", nil
	}
	return model.RBEEffectiveBotID(pool, effectiveDimKey, effectiveDim), nil
}

// randomDuration returns a random duration in the given range.
//
// Returns it as a float number of seconds.
func randomDuration(ctx context.Context, min, max time.Duration) float64 {
	dt := mathrand.Int63n(ctx, int64(max)-int64(min))
	dur := min + time.Duration(dt)
	return dur.Seconds()
}
